package kafka

import "github.com/Shopify/sarama"

// 生产
type Msg struct {
	Topic   string
	Message string
}

// 生产者发布消息
func (m Msg) SendMessage() (int32, int64, error) {
	msg := &sarama.ProducerMessage{
		Topic:     m.Topic,
		Partition: kafkaConfig.Partition,
		Key:       sarama.StringEncoder(kafkaConfig.EncoderKey),
		Value:     sarama.ByteEncoder(m.Message),
	}
	return producer.SendMessage(msg)
}

// 订阅
type Sub struct {
	Topic string
	Event Event
}

// 消费者消息事件
type Event func(*sarama.ConsumerMessage)

func (s Sub) Subcribe() (err error) {
	var partitionList []int32
	partitionList, err = consumer.Partitions(s.Topic)
	if err != nil {
		return err
	}
	for p := range partitionList {
		var partitionConsumer sarama.PartitionConsumer
		partitionConsumer, err = consumer.ConsumePartition(s.Topic, int32(p), sarama.OffsetNewest)
		if err != nil {
			return err
		}
		go func() {
			for {
				s.Event(<-partitionConsumer.Messages())
			}
		}()
	}
	return nil
}
